package com.atguigu.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.Constant;
import com.atguigu.utils.JdbcUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.*;

public class DwdTableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    private HashMap<String, TableProcess> tableProcessHashMap;

    public DwdTableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        tableProcessHashMap = new HashMap<>();
        Connection connection = DriverManager.getConnection(Constant.MYSQL_URL, "root", "000000");
        List<TableProcess> tableProcessList = JdbcUtil.queryList(connection,
                "select * from table_process where sink_type='dwd'",
                TableProcess.class,
                true);
        //遍历集合,将数据放入HashMap
        for (TableProcess tableProcess : tableProcessList) {
            tableProcessHashMap.put(tableProcess.getSourceTable() + "-" + tableProcess.getSourceType(), tableProcess);
        }
        connection.close();
    }

    //value:{"database":"gmall-220623-flink","table":"comment_info","type":"insert","ts":1669162958,"xid":1111,"xoffset":13941,"data":{"id":1595211185799847960,"user_id":119,"nick_name":null,"head_img":null,"sku_id":31,"spu_id":10,"order_id":987,"appraise":"1204","comment_txt":"评论内容：48384811984748167197482849234338563286217912223261","create_time":"2022-08-02 08:22:38","operate_time":null}}
    @Override
    public void processElement(JSONObject value, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {

        //获取状态数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String table = value.getString("table");
        String type = value.getString("type");
        String key = table + "-" + type;
        TableProcess tableProcess = broadcastState.get(key);

        TableProcess tableProcessMap = tableProcessHashMap.get(key);

        //根据状态信息做数据过滤  行列
        if (tableProcess != null || tableProcessMap != null) {

            if (tableProcess == null) {
                tableProcess = tableProcessMap;
            }

            //列过滤
            filterColumns(value.getJSONObject("data"), tableProcess.getSinkColumns());
            //补充信息写出
            value.put("topic", tableProcess.getSinkTable());
            out.collect(value);
        } else {
            System.out.println(table + "配置信息不存在：" + key);
        }
    }

    /**
     * 过滤字段
     *
     * @param data        {"id":"1001","tm_name":"atguigu","logo_url":"/xx/xx"}
     * @param sinkColumns "id,tm_name"
     */
    private void filterColumns(JSONObject data, String sinkColumns) {

        //处理过滤字段
        String[] split = sinkColumns.split(",");
        List<String> columnList = Arrays.asList(split);

        Set<Map.Entry<String, Object>> entries = data.entrySet();
        entries.removeIf(next -> !columnList.contains(next.getKey()));

//        Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
//        while (iterator.hasNext()) {
//            Map.Entry<String, Object> next = iterator.next();
//            if (!sinkColumns.contains(next.getKey())) {
//                iterator.remove();
//            }
//        }
    }

    //value：{"before":null,"after":{"source_table":"base_category3","sink_type":"dim","sink_table":"dim_base_category3","sink_columns":"id,name,category2_id","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669162876406,"snapshot":"false","db":"gmall-220623-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669162876406,"transaction":null}
    @Override
    public void processBroadcastElement(String value, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {

        //获取广播状态
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

        //将数据解析成TableProcess对象
        JSONObject jsonObject = JSONObject.parseObject(value);
        String op = jsonObject.getString("op");
        if ("d".equals(op)) {
            TableProcess tableProcess = JSONObject.parseObject(jsonObject.getString("before"), TableProcess.class);
            //如果是DWD的配置信息,则删除状态
            if ("dwd".equals(tableProcess.getSinkType())) {
                String key = tableProcess.getSourceTable() + "-" + tableProcess.getSourceType();
                broadcastState.remove(key);
                tableProcessHashMap.remove(key);
            }
        } else {
            TableProcess tableProcess = JSONObject.parseObject(jsonObject.getString("after"), TableProcess.class);
            //如果是DWD的配置信息,则插入状态数据
            if ("dwd".equals(tableProcess.getSinkType())) {
                String key = tableProcess.getSourceTable() + "-" + tableProcess.getSourceType();
                broadcastState.put(key, tableProcess);
            }
        }
    }
}
